Agentic 工程实践文档
本文档介绍了 ROLL框架 中 Agentic 部分的开发实践经验,包括环境管理器开发协议、GlobalDataset 使用、验证模式配置和轨迹合成功能。
1. EnvManager 开发协议
EnvManager 是 Agentic 框架的核心组件,负责环境的管理和轨迹的生成。开发新的 EnvManager 需要遵循以下协议:
1.1 核心循环机制
EnvManager 必须实现 run_rollout_loop 方法,该方法遵循以下协议:
def run_rollout_loop(self, data: DataProto):
"""
1. 每次调用 run_rollout_loop 时,会持续执行 episode 直到收到数据收集完成的命令
2. 需要重置 seed 以确保所有 group 的一致性
3. episode_id 从 scheduler 端获取
Seed 更新逻辑:
group_seed = base_seed + group_id
episode_seed = group_seed + episode_id
trajectory_id: f"{group_id}_{episode_id}_{episode_seed}"
"""
# 极简调用示例
while self.running:
# 从 scheduler 获取 episode_id
self.episode_id = ray.get(self.output_queue.get_episode_id.remote(self.env_config["group_id"]))
if self.episode_id is None:
break
# 重置环境
rollout_cache = self.reset()
while rollout_cache is not None and not rollout_cache.terminated and not rollout_cache.truncated:
# 做出决策
lm_output = self.make_decision(rollout_cache)
# 执行环境步骤
rollout_cache = self.step(lm_output)
# 提交轨迹
rollout = self.formulate_rollouts(rollout_cache)
ray.get(self.output_queue.put.remote(self.env_config['group_id'], self.episode_id, start_step, rollout))
1.2 EnvManager开发约束
- while loop 无限循环:EnvManager 通过 while 循环持续执行 episode
- 数据集遍历完成才返回 None 退出:当数据集遍历完成时,
reset()方法返回 None,触发循环退出 - 每个 episode 必须有对应的轨迹 put:每个完成的 episode 必须通过
output_queue.put提交轨迹数据
2. GlobalDataset 使用
2.1 设计目的
为了避免每个 env 各自读取数据造成的访存/内存瓶颈,框架层面提供了 GlobalDataset 组件,实现数据集的统一管理和分发。
2.2 类定义和位置
# 位置: roll.datasets.global_dataset.GlobalDataset
@ray.remote
class GlobalDataset:
def __init__(self, dataset_name, split: str = "train", mode="sample", dataset_kwargs: Dict = None):
# mode: "sample" 或 "traversal"
2.3 两种工作模式
Sample Mode(采样模式)
- 用途:训练模式下随机采样数据集
- 特点:每次随机选择数据项
- 配置:
mode="sample"
Traversal Mode(遍历模式)
- 用途:验证模式下需要遍历整个数据集
- 特点:按顺序遍历数据集,确保每个数据项都被访问
- 配置:
mode="traversal"
2.4 核心特性
- 确定性采样:
get_data_item方法保证相同的 seed 返回相同的数据 - 状态管理:内部维护索引状态,支持数据集的重置和遍历
2.5 使用示例
参考 MathEnv、SWEEnv、TerminalBenchEnv 的实现:
class MathEnv(GEMMathEnv):
def __init__(self, dataset_name: str = "", mode: str = "train", **kwargs):
# 转换 train/val mode 为 sample/traversal
global_dataset_mode = "sample" if self.mode == "train" else "traversal"
self.dataset = GlobalDataset.options(
name=f"{self.mode}_{dataset_name}",
get_if_exists=True,
namespace=RAY_NAMESPACE
).remote(
dataset_name=dataset_name,
split=split,
mode=global_dataset_mode
)
# 创建并注册 dataset_manager,这是实现多次val的必要实现
self.dataset_manager = GlobalDatasetManager.options(
name=f"{self.mode}_dataset_manager",
get_if_exists=True,
namespace=RAY_NAMESPACE
).remote()
ray.get(self.dataset_manager.register.remote(
dataset_name=dataset_name,
dataset_ref=self.dataset
))
3. 训练过程中 val 遍历数据集配置
3.1 配置原则
对于需要遍历数据集的场景(如 math/code/swe 等验证场景),需要特殊配置:
- env 定义时设置 mode 参数:val 模式下需要设置
mode=val - 设置 val_batch_size=-1:这样可以遍历整个 val dataset
- env.reset 返回 None 时退出:当数据集遍历完成时,env.reset 会返回 None
3.2 MathEnv 实现参考
位置:roll.pipeline.agentic.env.gem.math_env.MathEnv
class MathEnv(GEMMathEnv):
def __init__(self, mode: str = "train", **kwargs):
# 转换模式
global_dataset_mode = "sample" if self.mode == "train" else "traversal"
self.dataset = GlobalDataset.remote(
dataset_name=dataset_name,
split=split,
mode=global_dataset_mode
)
def reset(self, seed: Optional[None] = None) -> Tuple[str, dict[str, Any]]:
data = ray.get(self.dataset.get_data_item.remote(seed=seed))
if data is None:
return None, None # 数据集遍历完成
# 处理数据...
3.3 YAML 配置示例
rollout_batch_size: 128
val_batch_size: -1 # 遍历整个数据集
deep_math:
env_type: "roll_math"
env_config:
mode: val # 设置为验证模式
dataset_name: data/math_deepmath_deal.jsonl
split: train
question_key: prompt
answer_key: ground_truth
3.4 随机采样类评估场景
对于游戏类等随机采样的评估场景,直接按常规方式配置即可,保证相同的 seed 返回相同的数据。随机采样是默认实现,无需特殊配置。